---
title: Job System
sidebarTitle: Jobs
---

The job system powers long-running operations in Spacedrive. It provides automatic persistence, progress tracking, and graceful interruption handling for tasks like indexing, file processing, and sync operations.

Jobs execute asynchronously with minimal boilerplate. They persist their state to survive crashes and resume where they left off. The system integrates with Spacedrive's task executor for efficient resource usage.

## Core Concepts

A job represents a resumable unit of work. Jobs report progress, handle interruptions, and maintain state across executions. The system manages job lifecycles automatically.

<Note>
Jobs are library-scoped. Each library maintains its own job database and execution queue.
</Note>

### Job Lifecycle

Jobs transition through defined states during execution:

<Steps>
<Step title="Queued">
Job created and waiting for execution. Initial state after dispatch.
</Step>
<Step title="Running">
Job actively executing. Progress updates flow to subscribers.
</Step>
<Step title="Paused">
Job interrupted but resumable. State persisted to database.
</Step>
<Step title="Completed">
Job finished successfully. Moved to history table.
</Step>
</Steps>

Failed or cancelled jobs cannot resume. The system distinguishes between recoverable interruptions and permanent failures.

### Key Components

The job system consists of several interconnected parts:

**Job Manager** coordinates all job operations. It maintains the job database, tracks running jobs, and handles lifecycle transitions. Located at `core/src/infra/job/manager.rs`.

**Job Registry** enables automatic job discovery. Jobs register themselves at compile time using the derive macro. The registry creates jobs dynamically from saved state. See `core/src/infra/job/registry.rs`.

**Job Context** provides execution environment. Jobs access the database, report progress, and interact with services through context. Implementation in `core/src/infra/job/context.rs`.

**Job Executor** bridges jobs with the task system. It manages interruption signals and checkpoint operations. Found at `core/src/infra/job/executor.rs`.

## Defining Jobs

Jobs implement two traits: `Job` for metadata and `JobHandler` for execution logic.

```rust
use sd_task_system::TaskId;
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Debug)]
struct ProcessFilesJob {
    location_id: i32,
    files: Vec<PathBuf>,
    processed: usize,
}

#[typetag::serde(name = "process_files")]
impl Job for ProcessFilesJob {
    const NAME: &'static str = "process_files";
    const VERSION: u32 = 1;
    const IS_RESUMABLE: bool = true;
}

#[async_trait::async_trait]
impl JobHandler for ProcessFilesJob {
    async fn run(&mut self, ctx: &JobContext) -> Result<Vec<i32>, JobError> {
        // Skip already processed files when resuming
        let files_to_process = &self.files[self.processed..];
        
        for (idx, file) in files_to_process.iter().enumerate() {
            // Check for interruption
            if ctx.check_interrupted().await? {
                return Err(JobError::Interrupted);
            }
            
            // Process file
            process_file(file, ctx).await?;
            
            // Update progress
            self.processed += 1;
            ctx.report_count_progress(self.processed, self.files.len()).await;
            
            // Save checkpoint periodically
            if idx % 100 == 0 {
                ctx.checkpoint().await?;
            }
        }
        
        Ok(vec![])
    }
}
```

<Info>
The `#[typetag::serde]` attribute enables polymorphic serialization. Jobs must be serializable to support resumption.
</Info>

### Progress Reporting

Jobs communicate progress through the context. The system supports multiple progress types:

```rust
// Count-based progress
ctx.report_count_progress(current, total).await;

// Percentage progress
ctx.report_percentage_progress(0.75).await;

// Bytes processed
ctx.report_bytes_progress(processed_bytes, total_bytes).await;

// Custom structured data
ctx.report_structured_progress("phase", json!({
    "stage": "analyzing",
    "files_found": 1500
})).await;
```

Progress updates throttle automatically. The system batches updates to prevent database overhead.

### Error Handling

Jobs distinguish between recoverable and permanent errors:

```rust
// Recoverable - job can resume
if network_unavailable() {
    return Err(JobError::Interrupted);
}

// Permanent failure - job cannot resume
if corrupt_data() {
    return Err(JobError::Critical(
        "Data corruption detected".into()
    ));
}

// Non-critical errors accumulate
ctx.report_non_critical_error("Skipped locked file").await;
```

<Warning>
Always check `ctx.check_interrupted()` in loops. This enables graceful shutdown and pause operations.
</Warning>

## Dispatching Jobs

The job manager provides typed and dynamic dispatch methods:

```rust
// Typed dispatch
let handle = manager.dispatch(ProcessFilesJob {
    location_id: 1,
    files: vec![path1, path2],
    processed: 0,
}).await?;

// Dynamic dispatch by name
let handle = manager.dispatch_by_name(
    "process_files",
    json!({
        "location_id": 1,
        "files": ["path1", "path2"],
        "processed": 0
    })
).await?;

// Wait for completion
let result = handle.wait().await?;
```

Job handles provide status monitoring and progress streaming:

```rust
// Subscribe to status changes
let mut status_rx = handle.status();
while let Ok(status) = status_rx.recv().await {
    match status {
        JobStatus::Running => println!("Job started"),
        JobStatus::Completed => break,
        _ => {}
    }
}

// Stream progress updates
let mut progress_rx = handle.progress();
while let Ok(progress) = progress_rx.recv().await {
    if let Some(count) = progress.count {
        println!("Processed {}/{}", count.current, count.total);
    }
}
```

## Database Schema

Jobs persist to a dedicated SQLite database (`jobs.db`) with three tables:

<ResponseField name="jobs" type="table">
Active job records containing:
<Expandable title="columns">
<ResponseField name="id" type="UUID">
Unique job identifier
</ResponseField>
<ResponseField name="name" type="TEXT">
Job type name for registry lookup
</ResponseField>
<ResponseField name="version" type="INTEGER">
Schema version for migrations
</ResponseField>
<ResponseField name="status" type="TEXT">
Current job state
</ResponseField>
<ResponseField name="data" type="BLOB">
MessagePack serialized job state
</ResponseField>
<ResponseField name="progress" type="BLOB">
Latest progress snapshot
</ResponseField>
<ResponseField name="metrics" type="TEXT">
Performance statistics JSON
</ResponseField>
</Expandable>
</ResponseField>

<ResponseField name="job_history" type="table">
Completed jobs moved here for audit trails
</ResponseField>

<ResponseField name="job_checkpoints" type="table">
Resumption checkpoints for long-running jobs
</ResponseField>

## Advanced Features

### Job Versioning

Jobs specify versions for schema evolution:

```rust
impl Job for DataMigrationJob {
    const VERSION: u32 = 2; // Increment when schema changes
}
```

The registry validates versions during resumption. Incompatible versions fail to load.

### Extension Jobs

The system supports WASM-based extension jobs:

```rust
manager.dispatch_extension_job(
    extension_id,
    job_name,
    job_data
).await?;
```

Extensions run in isolated contexts with limited capabilities.

### Performance Considerations

The job system optimizes for throughput and resumability:

- Progress updates batch at 2-second intervals
- Checkpoints save incrementally
- Database operations use prepared statements
- Channels use bounded capacity to prevent memory growth

<Tip>
For high-frequency operations, batch work into larger chunks. This reduces checkpoint overhead and improves performance.
</Tip>

## Integration Points

Jobs integrate with core Spacedrive systems:

**Task System**: Jobs execute as tasks with configurable priority. The executor handles work distribution across threads.

**Event System**: State changes emit events for UI updates. Subscribe to `JOB_MANAGER_EVENTS` for notifications.

**Action System**: User actions spawn jobs with audit context. The system tracks who initiated operations.

**Library System**: Each library maintains independent job state. Jobs cannot access cross-library data.

## Common Patterns

### Batch Processing

Process items in chunks for efficiency:

```rust
const BATCH_SIZE: usize = 1000;

for (batch_idx, chunk) in items.chunks(BATCH_SIZE).enumerate() {
    if ctx.check_interrupted().await? {
        self.batch_idx = batch_idx;
        return Err(JobError::Interrupted);
    }
    
    process_batch(chunk).await?;
    
    ctx.report_count_progress(
        batch_idx * BATCH_SIZE + chunk.len(),
        items.len()
    ).await;
    
    ctx.checkpoint().await?;
}
```

### Phased Execution

Split complex jobs into phases:

```rust
match self.phase {
    Phase::Discovery => {
        let items = discover_items(ctx).await?;
        self.items = items;
        self.phase = Phase::Processing;
        ctx.checkpoint().await?;
    }
    Phase::Processing => {
        process_items(&mut self.items, ctx).await?;
        self.phase = Phase::Cleanup;
        ctx.checkpoint().await?;
    }
    Phase::Cleanup => {
        cleanup_resources(ctx).await?;
    }
}
```

### Child Jobs

Spawn dependent jobs (feature in development):

```rust
let child_ids = ctx.spawn_children(vec![
    AnalyzeFileJob { path: file1 },
    AnalyzeFileJob { path: file2 },
]).await?;

ctx.wait_for_children(child_ids).await?;
```

## Debugging

Enable file-based logging for troubleshooting:

```rust
std::env::set_var("SD_JOBS_FILE_LOG", "1");
```

Logs write to `.spacedrive/jobs/{job_id}.log` with detailed execution traces.

Monitor job metrics through the context:

```rust
let metrics = ctx.get_metrics();
println!("Execution time: {}s", metrics.elapsed_seconds);
println!("Memory used: {}MB", metrics.memory_mb);
```

<Danger>
Never block the job executor thread. Use `tokio::task::spawn_blocking` for CPU-intensive work.
</Danger>

The job system provides the foundation for reliable background processing in Spacedrive. Its resumable design ensures operations complete despite interruptions, while the progress system keeps users informed of ongoing work.